Audioデータをクラウドに送ってみました。 MQTT + Amazon Kinesis Data Firehose + S3 (JSON)
1 はじめに
CX事業本部の平内(SIN)です。
今回は、エッジ側のAudioデータをクラウドへ送信する要領を確認してみました。
送信の方法や、保存先は色々考えられますが、手始めにMQTTで送信し、Amazon Kinesis Data Firehose経由で、S3に保存する形を試してみました。
また、保存されたAudioデータから時間を指定して、wavファイルを生成するLambda関数も書いてみました。
確認が容易なように、とりあえず、Audioデータをターゲットとしていますが、連続するストリームを扱うという意味では、Audioに限らず応用ができるのではと考えています。
2 Audio入力
エッジ側は、RaspberryPiとしたのですが、デフォルトでAudio入力の機能が無いので、Webカメラをつないで、Audio入力に使用しました。
Webカメラ(C920)を接続して、arecordコマンドでAudioデバイスを一覧すると、device 0で認識されていることが分かります。
$ arecord -l **** List of CAPTURE Hardware Devices **** card 1: C920 [HD Pro Webcam C920], device 0: USB Audio [USB Audio] Subdevices: 1/1 Subdevice #0: subdevice #0
このデバイスIDを使用して、下記のコードで、得られる情報から、C920のAudio入力は、サンプルレートが、32KHz 、チャンネル数が2ということが分かります。
device_info.py
import pyaudio p = pyaudio.PyAudio() info = p.get_device_info_by_index(0) print(info)
$ python3 device_info.py ・・・略) {'index': 0, 'structVersion': 2, 'name': 'HD Pro Webcam C920: USB Audio (hw:1,0)', \ 'hostApi': 0, 'maxInputChannels': 2, 'maxOutputChannels': 0, \ 'defaultLowInputLatency': 0.01196875, 'defaultLowOutputLatency': -1.0, \ 'defaultHighInputLatency': 0.048, 'defaultHighOutputLatency': -1.0, \ 'defaultSampleRate': 32000.0}
なお、pyaudioのインストールは、以下で行いました。
brew install portaudio pip3 install pyaudio
3 録音
下記のコードは、Audio入力から10秒間録音し、sample.wavを生成するものです。
ストリームのバッファサイズをサンプルレートと同じにしているので、ループは、1秒毎に処理されます。ストリームから取得したデータは、チャンネルを1つ削減し、また、1/4に間引くことで、雑にサンプルレートを8Khzに変換しています。
チャンネル及びサンプルレートを削減したのは、事後、クラウドへ送信するデータ量を少し節約しようという意図です。
取得したデータは、waveライブラリを使用して、ヘッダを追加してwavファイルに保存しています。
record.py
import pyaudio import wave import numpy as np DEVICE_INDEX = 0 CHANNELS = 2 SAMPLE_RATE = 32000 # サンプルレート CHUNK = SAMPLE_RATE # 1秒ごとに取得する FORMAT = pyaudio.paInt16 RECORD_SECONDS = 10 # 10秒間録音する # open stream p = pyaudio.PyAudio() stream = p.open(format = FORMAT, channels = CHANNELS, rate = SAMPLE_RATE, input = True, input_device_index = DEVICE_INDEX, frames_per_buffer = CHUNK) # recording print("recording ...") frames = [] for _ in range(0, int(SAMPLE_RATE / CHUNK * RECORD_SECONDS)): # 1秒分のデータ読み込み data = stream.read(CHUNK) # numpy配列に変換 data = np.frombuffer(data, dtype="int16") # チャンネル 2ch -> 1ch data = data[0::2] # サンプルレート 32000Hz -> 8000Hz data = data[0::4] # byteに戻す data = data.tobytes() frames.append(data) print ("data size:{}".format(len(data))) data = b''.join(frames) print("done.") # close strema stream.stop_stream() stream.close() p.terminate() # save CHANNELS = 1 # 1ch SAMPLE_RATE = 8000 # 8kHz file_name = "./sample.wav" wf = wave.open(file_name, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(SAMPLE_RATE) wf.writeframes(data) wf.close()
4 エッジ側のコード
1秒に一回のタイミングで、AudioデータをMQTTで送信しているコードです。
AudioのRAWデータは、16K/secでしたが、テキスト化することで、サイズは、22K/sec程度になっています。
index.py
import pyaudio from producer import Producer import numpy as np DEVICE_INDEX = 0 CHANNELS = 2 SAMPLE_RATE = 32000 # サンプルレート FORMAT = pyaudio.paInt16 CHUNK = SAMPLE_RATE # 1秒ごとに取得する # open stream p = pyaudio.PyAudio() stream = p.open(format = FORMAT, channels = CHANNELS, rate = SAMPLE_RATE, input = True, input_device_index = DEVICE_INDEX, frames_per_buffer = CHUNK) producer = Producer() try: print("start ...") while True: # 1秒分のデータ読み込み data = stream.read(CHUNK) # numpy配列に変換 data = np.frombuffer(data, dtype="int16") # チャンネル 2ch -> 1ch data = data[0::2] # サンプルレート 32000Hz -> 8000Hz data = data[0::4] # byteに戻す data = data.tobytes() producer.send(data) except: stream.stop_stream() stream.close() p.terminate()
producer.py
from mqtt import Mqtt import json from datetime import datetime import base64 class Producer(): def __init__(self): self.__topic = "topic/audio_transmission" root_ca = "./certs/RootCA.pem" key = "./certs/xxxxxxxx-private.pem.key" cert = "./certs/xxxxxxxx-certificate.pem.crt" endpoint = "xxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com" self.__mqtt = Mqtt(root_ca, key, cert, endpoint) def send(self, data): now = datetime.now().strftime('%Y-%m-%dT%H:%M:%S') b64_data = base64.b64encode(data) raw_data = b64_data.decode() payload = { "timestamp": now, "raw_data": raw_data } self.__mqtt.publish(self.__topic, json.dumps(payload))
IoT Coreのコンソールで、publishされたデータを確認している様子です。
5 Amazon Kinesis Data Firehose
Amazon Kinesis Data Firehoseは、audio_transmission_firehose_streamという名前で作成し、特に、コンバートや圧縮などは設定せず、destinationをS3バケットとしています。
なお、Buffer sizeは、10M、Buffer intervalは60secとしました。この設定により、1秒ごとに送られくるデータを、1分毎に1ファイルとしてS3に保存されることになります。
1秒分のデータは、約22Kなので、約1.4M(22k*60)以上のバッファにしておけば、Buffer Intervalが先にトリガーされるため、1分毎に出力されるという想定です。
もし、要件的に、あまりリアルタイム性が必要ない場合は、1ファイルに格納するデータ量を大きく取ることで、S3への書き込み回数は節約できることになります。
6 Rule
IoT Coreでは、ルールエンジンを設定して、メッセージブローカーに到着したデータをAmazon Kinesis Data Firehoseに送っています。
セパレータに「改行」を指定して、秒単位のデータを1行ごと分離できるようにしました。
7 S3
S3に出力されたデータです。年月日のプレフィックスと秒数まで入ったKey名で保存されていることが分かります。
1つのファイルを見ると、1秒ごとのデータが60行(60秒分)格納されているのが確認できます。ここから対象秒のRAWデータが取り出せます。
ここまでで、Audioデータの送信は完了となります。
8 Lambda
ここからは、S3に蓄積されたデータから、データを取り出す一例として、Lambdaを書いてみました。
この関数は、取得開始時間、期間(秒)及び、出力ファイル名を与えて実行します。
指定された時間から、S3上のオブジェクト名を検索し、期間に該当するものをダウンロードして、RAWデータをデコードしています。また、RAWデータは、ヘッダを追加してwavファイルとして、S3にアップロードされます。
lambda_function.py
import json import datetime import os import boto3 import base64 import wave def lambda_handler(event, context): BUCKET_NAME = os.environ['BUCKET_NAME'] output_filename = event["output_filename"] # 出力ファイル名 start_time_str = event["start_time"] # 開始時間(YYYY-mm-dd HH:MM:SS) period_sec = int(event["period_sec"]) # 期間(秒) print("start_time:{} period_sec:{}".format(start_time_str, period_sec)) # 開始時間と終了時間(JST) start_time_jst = datetime.datetime.strptime(start_time_str, '%Y-%m-%d %H:%M:%S') end_time_jst = start_time_jst + datetime.timedelta(seconds=period_sec) print("JST {} - {} ".format(start_time_jst, end_time_jst)) # S3上のオブジェクト名を検索するため # 開始時間と終了時間(UTC) ファイル検索用なので、開始時間は、1分前とする start_time_utc = start_time_jst + datetime.timedelta(hours=-9) end_time_utc = start_time_utc + datetime.timedelta(seconds=period_sec) start_time_utc = start_time_utc + datetime.timedelta(minutes=-1) print("UTC {} - {}".format(start_time_utc, end_time_utc)) s3client = boto3.client('s3') # 当日分のオブジェクト名を列挙する prefix = "{:4d}/{:02d}/{:02d}".format( start_time_utc.year, start_time_utc.month, start_time_utc.day ) response = s3client.list_objects_v2( Bucket = BUCKET_NAME, Prefix = prefix ) # オブジェクト名から、対象期間ヒットするオブジェクトを列挙する target_keys = [] if("Contents" in response): for content in response["Contents"]: # オブジェクト名からtimestampを取得する key = content["Key"] tmp = key.split('/')[4].split('-') year = int(tmp[2]) month = int(tmp[3]) day = int(tmp[4]) hour = int(tmp[5]) minute = int(tmp[6]) second = int(tmp[7]) dt = datetime.datetime(year, month, day, hour, minute, second, microsecond=0) if(start_time_utc <= dt and dt <= end_time_utc): target_keys.append(key) target_keys.sort() # オブジェクトをダウンロードして、timestampが取得期間ヒットするRAWデータを取得する frames = [] for key in target_keys: body = s3client.get_object(Bucket=BUCKET_NAME, Key=key)['Body'].read() # 1行が、1秒分のデータとなっている lines = body.decode().split('\n') for line in lines: if(line==''): continue s = json.loads(line) timestamp = datetime.datetime.strptime(s["timestamp"], '%Y-%m-%dT%H:%M:%S') # 取得期間のデータは、framesに追加する if(start_time_jst <= timestamp and timestamp <= end_time_jst): # テキストデータをバイナリに戻す b64_data = s["raw_data"].encode() frames.append(base64.b64decode(b64_data)) data = b''.join(frames) # RAWデータをwavファイルとして保存する CHANNELS = 1 # 1ch SAMPLE_RATE = 8000 # 8kHz SAMPLE_WIDTH = 2 tmp_file_name = "/tmp/tmp.wav" wf = wave.open(tmp_file_name, 'wb') wf.setnchannels(CHANNELS) wf.setsampwidth(SAMPLE_WIDTH) wf.setframerate(SAMPLE_RATE) wf.writeframes(data) wf.close() s3client.upload_file(tmp_file_name, BUCKET_NAME, output_filename) return {}
AWSコンソールから、取得開始時間、期間及び、出力ファイル名を指定して関数を実行している様子です。
S3上に指定したファイル名でwavファイルがアップロードされます。
9 最後に
今回は、MQTTを使用してAudioデータをクラウドに送信する要領を試してみました。
Audioデータをテキスト化するなど、ちょっとオーバーヘッドな感じですが、一応、エッジ側のタイムスタンプが入っています。MQTTにJSON形式で送っているため、付随するアトリビュートがあれば、追加してルールで捌くなどの処理も可能だと思います。
全てのコードは下記に置きました
https://github.com/furuya02/AudioTransmission/tree/main/sample_1